Cloud WorkflowsでBigQueryのクエリ結果を使ってみる
はじめに
データアナリティクス事業本部のkobayashiです。
GoogleCloudのWorkflowsでBigQueryを扱うブログを何本か書きましたが今回はBigQueryへクエリを実行してその結果を扱ってみたいと思います。
WorkflowsでBigQueryのクエリ結果を扱う
Workflowsで使うことのできるBigQueryコネクタの中でgoogleapis.bigquery.v2.jobs.queryを使うことでBigQueryでクエリを実行した結果をWorkflows中で扱うことができます。
具体的には実行結果はgoogleapis.bigquery.v2.jobs.queryを実行したStepのresultとして出力でき、それを後続の処理で扱う事ができます。クエリ結果はQueryResponse型 で帰ってきますのでこの中から必要な値を取り出して使います。
QueryResponseでは代表として以下のような値があリます。
- erorrs
- クエリ実行中に発生したエラーメッセージ
- jobComplete
- クエリ実行が完了したかどうか
- rows
- クエリ結果
- schema
- クエリ結果のスキーマ
- totalRows
- クエリ結果セットの合計行数
他にも取得できる値があるので詳しくは公式ドキュメントをご確認ください。
- googleapis.bigquery.v2.jobs.query | Workflows | Google Cloud
- QueryResponse | Workflows | Google Cloud
今回は一般公開データセットの「Google Trends - International」からサンプルのSQLにある国別に最も検索されている検索語句を取得できる「What are the top search terms across the globe for the latest available data?」のクエリを使ってみたいと思います。
Workflowsを実行する
以下がWorkflowsのyamlになります。内容は以下のようになります。
ExecBqQueryステップでサンプルクエリを実行しその結果をquery_result
に入れています。後続の各ステップではクエリ結果の入ったquery_result
から値をlogに出力しています。
query_result
はQueryResponse型 になるのでQueryResponse型のドキュメントを確認しつつ出力する値を指定します。
main: params: [ args ] steps: - init: assign: - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - ExecBqQuery: call: googleapis.bigquery.v2.jobs.query args: projectId: ${project_id} body: query: | SELECT country_name, term, ARRAY_AGG(STRUCT(rank,week) ORDER BY week DESC, refresh_date DESC LIMIT 1) x FROM `bigquery-public-data.google_trends.international_top_terms` WHERE refresh_date = (SELECT MAX(refresh_date) FROM `bigquery-public-data.google_trends.international_top_terms`) AND week = (SELECT MAX(week) FROM `bigquery-public-data.google_trends.international_top_terms` ) GROUP BY country_name, term ORDER BY (SELECT country_name FROM UNNEST(x)), (SELECT rank FROM UNNEST(x)) useLegacySql: false result: query_result - query_result_schema: call: sys.log args: # クエリ結果のスキーマ text: ${query_result.schema} severity: INFO - query_result_totalrows: call: sys.log args: # クエリ結果の件数 text: ${query_result.totalRows} severity: INFO - query_result_rows: call: sys.log args: # クエリ結果の1行目の1列目(country_name)の値 text: ${query_result.rows[0].f[0].v} severity: INFO - query_result_rows3: call: sys.log args: # クエリ結果の1行目の値 text: ${query_result.rows[0]} severity: INFO - query_result_rows4: call: sys.log args: # クエリ結果全体 text: ${query_result.rows} severity: INFO - query_result_queryresult: call: sys.log args: # QueryResponseすべて text: ${query_result} severity: INFO
ではこのコードを実行してみます。
$ gcloud workflows deploy bq_query --source=bq_query.yml --service-account bq_query@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1 $ gcloud workflows run bq_query --location asia-northeast1
Workflowsの実行が成功するとログにそれぞれ出力結果が表示されます。
# クエリ結果のスキーマ {"fields":[{"mode":"NULLABLE","name":"country_name","type":"STRING"},{"mode":"NULLABLE","name":"term","type":"STRING"},{"fields":[{"mode":"NULLABLE","name":"rank","type":"INTEGER"},{"mode":"NULLABLE","name":"week","type":"DATE"}],"mode":"REPEATED","name":"x","type":"RECORD"}]} # クエリ結果の件数 1023 # クエリ結果の1行目の1列目(country_name)の値 Argentina # クエリ結果の1行目の値 {"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]} # クエリ結果全体 [{"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]},{"f":[{"v":"Argentina"},{"v":"Sanatorio Finochietto"},{"v":[{"v":{"f":[{"v":"2"},{"v":"2022-11-06"}]}}]}]},{"f":[{"v":"Argentina"},{"v":"A Q hora juega Argentina"},{"v":[{"v":{"f":[{"v":"3"},{"v":"2022-11-06"}]}}]}]},...] # QueryResponseすべて {"cacheHit":true,"jobComplete":true,"jobReference":{"jobId":"job_Rp_WyYBSPaychMVqYEBvjw2Cnxs3","location":"US","projectId":"kobayashi-masahiro"},"kind":"bigquery#queryResponse","rows":[{"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]},...
クエリ結果がjson型式で出力されていることがわかります。rows全体も出力していますが、これは最大応答サイズで切られてしまいますので厳密に全件を使うことはできません。追加の行がある場合はQueryResponseのjobReference
に含まれるjobId,location
を指定してgoogleapis.bigquery.v2.jobs.getQueryResults
コネクタを使うことで取得できます。ただし、Workflowsで最大応答サイズを超えたすべてのクエリ結果を使うことはほぼないと思います。
Method: googleapis.bigquery.v2.jobs.getQueryResults | Workflows | Google Cloud
まとめ
Cloud WorkflowsでBigQueryへクエリを実行してその結果を扱ってみました。クエリ結果全体は使うことはないと思いますが、クエリ結果の件数やエラーなどに応じて後続の処理を分岐させるといった使い方で活躍するのではないでしょうか。
最後まで読んで頂いてありがとうございました。